package com.kaigejava.rocketmq.maindemo.product.filter;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

/**
 * @author 凯哥Java
 * @description 过滤消息之sql语法过滤的生产者
 * @company
 * @since 2022/10/19 14:09
 */
public class SqlFilterMessageProducer {
    public static void main(String[] args) throws Exception {
        //1：创建消息生产者producer,并指定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2：制定nameserver地址
        producer.setNamesrvAddr("192.168.50.132:9876");
        //设置发送超时时间：
        producer.setSendMsgTimeout(10000);

        //3：启动prodicer
        producer.start();
        //4：创建消息对象，指定主题Topic、Tag和消息体
        for (int i = 0; i < 10; i++) {
            Message message = new Message("sql-filter-topic","SqlTag0",("from sql-filter-topic-main"+i).getBytes());
            //设置自定义属性-sql语法过滤的时候，根据此字段进行过滤
            message.putUserProperty("i",String.valueOf(i));
            //5：发送消息
            SendResult result = producer.send(message,60000);
            SendStatus status = result.getSendStatus();
            String msgId = result.getMsgId();
            int queueId = result.getMessageQueue().getQueueId();
            String offsetMegId = result.getOffsetMsgId();
            long offset = result.getQueueOffset();
            String sendResultMsg = "过滤消息第"+i+"个发送状态："+status+"\t"+"消息id:"+msgId+"\t 消费者队列id:"+queueId +"\t offsetMegId:"+offsetMegId+"\t offset:"+offset;
            System.out.println(sendResultMsg);
        }
        //6：关闭生产者producer
        producer.shutdown();
    }
}
